package com.coupon.distribution.service.impl;

import com.alibaba.fastjson.JSON;
import com.coupon.common.constant.Constants;
import com.coupon.distribution.constant.CouponStatusEnum;
import com.coupon.distribution.dao.CouponDao;
import com.coupon.distribution.entity.Coupon;
import com.coupon.distribution.service.KafkaService;
import com.coupon.distribution.vo.CouponKafkaMessageVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Optional;

/**
 * @author 王哲
 * @Contact 1121586359@qq.com
 * @ClassName KafkaServiceImpl.java
 * @create 2023年06月26日 下午3:30
 * @Description KafkaService实现 优惠券相关的Kafka服务接口实现
 * 核心思想：是将 Cache 中的 CouponTemplate 的状态变化同步到 DB 中
 * @Version V1.0
 */
@Slf4j
@Service
public class KafkaServiceImpl implements KafkaService {

    @Autowired
    private CouponDao couponDao;

    // 消费优惠券 Kafka 消息
    @Override
    @KafkaListener(topics = {Constants.TOPIC}, groupId = "coupon-1")
    public void consumeCouponKafkaMessage(ConsumerRecord<?, ?> record) {
        // 尝试获取消息体
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        // 如果存在消息体
        if (kafkaMessage.isPresent()) {
            // 获取消息体
            Object message = kafkaMessage.get();
            CouponKafkaMessageVO couponKafkaMessageVO =
                    JSON.parseObject(message.toString(), CouponKafkaMessageVO.class);
            // 打印日志
            log.info("CouponKafkaMessage: {}", message);

            // 根据状态处理消息
            CouponStatusEnum couponStatusEnum =
                    CouponStatusEnum.of(couponKafkaMessageVO.getStatus());

            switch (couponStatusEnum) {
                case USABLE:
                    break;
                case USED:
                    processUsedCoupons(couponKafkaMessageVO, couponStatusEnum);
                    break;
                case EXPIRED:
                    processExpiredCoupons(couponKafkaMessageVO, couponStatusEnum);
                    break;
            }

        }
    }

    /**
     * 处理已使用优惠券状态消息
     * @param kafkaMessage
     * @param status
     */
    private void processUsedCoupons(CouponKafkaMessageVO kafkaMessage,
                                    CouponStatusEnum status) {

        // TODO 给用户发送短信
        processCouponKafkaMessage(kafkaMessage, status);
    }

    /**
     * 处理过期优惠券状态消息
     * @param kafkaMessage
     * @param status
     */
    private void processExpiredCoupons(CouponKafkaMessageVO kafkaMessage,
                                    CouponStatusEnum status) {

        // TODO 给用户发送推送
        processCouponKafkaMessage(kafkaMessage, status);
    }

    /**
     * 处理优惠券状态消息
     *
     * @param kafkaMessage
     * @param status
     */
    private void processCouponKafkaMessage(CouponKafkaMessageVO kafkaMessage,
                                           CouponStatusEnum status) {

        List<Coupon> coupons = couponDao.findAllById(kafkaMessage.getIds());

        if (CollectionUtils.isEmpty(coupons) || coupons.size() != kafkaMessage.getIds().size()) {

            log.error("Can Not Find Right Coupon Info: {}",
                    JSON.toJSONString(kafkaMessage));
            // TODO 发送邮件
            return;
        }

        coupons.forEach(coupon -> coupon.setStatus(status));

        log.info("CouponKafkaMessage Op Coupon Count: {}",
                couponDao.saveAll(coupons).size());
    }

}
